[[sending-messages]]
= Sending Messages

This section covers how to send messages.

[[kafka-template]]
== Using `KafkaTemplate`

This section covers how to use `KafkaTemplate` to send messages.

[[overview]]
=== Overview

The `KafkaTemplate` wraps a producer and provides convenience methods to send data to Kafka topics.
The following listing shows the relevant methods from `KafkaTemplate`:

[source, java]
----
CompletableFuture<SendResult<K, V>> sendDefault(V data);

CompletableFuture<SendResult<K, V>> sendDefault(K key, V data);

CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);

CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);

CompletableFuture<SendResult<K, V>> send(String topic, V data);

CompletableFuture<SendResult<K, V>> send(String topic, K key, V data);

CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);

CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);

CompletableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);

CompletableFuture<SendResult<K, V>> send(Message<?> message);

Map<MetricName, ? extends Metric> metrics();

List<PartitionInfo> partitionsFor(String topic);

<T> T execute(ProducerCallback<K, V, T> callback);

<T> T executeInTransaction(OperationsCallback<K, V, T> callback);

// Flush the producer.
void flush();

interface ProducerCallback<K, V, T> {

    T doInKafka(Producer<K, V> producer);

}

interface OperationsCallback<K, V, T> {

    T doInOperations(KafkaOperations<K, V> operations);

}
----

See the https://docs.spring.io/spring-kafka/api/org/springframework/kafka/core/KafkaTemplate.html[Javadoc] for more detail.

IMPORTANT: In version 3.0, the methods that previously returned `ListenableFuture` have been changed to return `CompletableFuture`.
To facilitate the migration, the 2.9 version added a method `usingCompletableFuture()` which provided the same methods with `CompletableFuture` return types; this method is no longer available.

The `sendDefault` API requires that a default topic has been provided to the template.

The API takes in a `timestamp` as a parameter and stores this timestamp in the record.
How the user-provided timestamp is stored depends on the timestamp type configured on the Kafka topic.
If the topic is configured to use `CREATE_TIME`, the user-specified timestamp is recorded (or generated if not specified).
If the topic is configured to use `LOG_APPEND_TIME`, the user-specified timestamp is ignored and the broker adds in the local broker time.

The `metrics` and `partitionsFor` methods delegate to the same methods on the underlying https://kafka.apache.org/20/javadoc/org/apache/kafka/clients/producer/Producer.html[`Producer`].
The `execute` method provides direct access to the underlying https://kafka.apache.org/20/javadoc/org/apache/kafka/clients/producer/Producer.html[`Producer`].

To use the template, you can configure a producer factory and provide it in the template's constructor.
The following example shows how to do so:

[source, java]
----
@Bean
public ProducerFactory<Integer, String> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    // See https://kafka.apache.org/documentation/#producerconfigs for more properties
    return props;
}

@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
    return new KafkaTemplate<Integer, String>(producerFactory());
}
----

Starting with version 2.5, you can now override the factory's `ProducerConfig` properties to create templates with different producer configurations from the same factory.

[source, java]
----
@Bean
public KafkaTemplate<String, String> stringTemplate(ProducerFactory<String, String> pf) {
    return new KafkaTemplate<>(pf);
}

@Bean
public KafkaTemplate<String, byte[]> bytesTemplate(ProducerFactory<String, byte[]> pf) {
    return new KafkaTemplate<>(pf,
            Collections.singletonMap(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class));
}
----

Note that a bean of type `ProducerFactory<?, ?>` (such as the one auto-configured by Spring Boot) can be referenced with different narrowed generic types.

You can also configure the template by using standard `<bean/>` definitions.

Then, to use the template, you can invoke one of its methods.

When you use the methods with a `Message<?>` parameter, the topic, partition, key and timestamp information is provided in a message header that includes the following items:

* `KafkaHeaders.TOPIC`
* `KafkaHeaders.PARTITION`
* `KafkaHeaders.KEY`
* `KafkaHeaders.TIMESTAMP`

The message payload is the data.

Optionally, you can configure the `KafkaTemplate` with a `ProducerListener` to get an asynchronous callback with the results of the send (success or failure) instead of waiting for the `Future` to complete.
The following listing shows the definition of the `ProducerListener` interface:

[source, java]
----
public interface ProducerListener<K, V> {

    void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata);

    void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata,
            Exception exception);

}
----

By default, the template is configured with a `LoggingProducerListener`, which logs errors and does nothing when the send is successful.

For convenience, default method implementations are provided in case you want to implement only one of the methods.

Notice that the send methods return a `CompletableFuture<SendResult>`.
You can register a callback with the listener to receive the result of the send asynchronously.
The following example shows how to do so:

[source, java]
----
CompletableFuture<SendResult<Integer, String>> future = template.send("myTopic", "something");
future.whenComplete((result, ex) -> {
    ...
});
----

`SendResult` has two properties, a `ProducerRecord` and `RecordMetadata`.
See the Kafka API documentation for information about those objects.

The `Throwable` can be cast to a `KafkaProducerException`; its `producerRecord` property contains the failed record.

If you wish to block the sending thread to await the result, you can invoke the future's `get()` method; using the method with a timeout is recommended.
If you have set a `linger.ms`, you may wish to invoke `flush()` before waiting or, for convenience, the template has a constructor with an `autoFlush` parameter that causes the template to `flush()` on each send.
Flushing is only needed if you have set the `linger.ms` producer property and want to immediately send a partial batch.

[[examples]]
=== Examples

This section shows examples of sending messages to Kafka:

.Non Blocking (Async)
====
[source, java]
----
public void sendToKafka(final MyOutputData data) {
    final ProducerRecord<String, String> record = createRecord(data);

    CompletableFuture<SendResult<Integer, String>> future = template.send(record);
    future.whenComplete((result, ex) -> {
        if (ex == null) {
            handleSuccess(data);
        }
        else {
            handleFailure(data, record, ex);
        }
    });
}
----

.Blocking (Sync)
[source, java]
----
public void sendToKafka(final MyOutputData data) {
    final ProducerRecord<String, String> record = createRecord(data);

    try {
        template.send(record).get(10, TimeUnit.SECONDS);
        handleSuccess(data);
    }
    catch (ExecutionException e) {
        handleFailure(data, record, e.getCause());
    }
    catch (TimeoutException | InterruptedException e) {
        handleFailure(data, record, e);
    }
}
----
====

Note that the cause of the `ExecutionException` is `KafkaProducerException` with the `producerRecord` property.

[[routing-template]]
== Using `RoutingKafkaTemplate`

Starting with version 2.5, you can use a `RoutingKafkaTemplate` to select the producer at runtime, based on the destination `topic` name.

IMPORTANT: The routing template does **not** support transactions, `execute`, `flush`, or `metrics` operations because the topic is not known for those operations.

The template requires a map of `java.util.regex.Pattern` to `ProducerFactory<Object, Object>` instances.
This map should be ordered (e.g. a `LinkedHashMap`) because it is traversed in order; you should add more specific patterns at the beginning.

The following simple Spring Boot application provides an example of how to use the same template to send to different topics, each using a different value serializer.

[source, java]
----
@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context,
            ProducerFactory<Object, Object> pf) {

        // Clone the PF with a different Serializer, register with Spring for shutdown
        Map<String, Object> configs = new HashMap<>(pf.getConfigurationProperties());
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
        DefaultKafkaProducerFactory<Object, Object> bytesPF = new DefaultKafkaProducerFactory<>(configs);
        context.registerBean("bytesPF", DefaultKafkaProducerFactory.class, () -> bytesPF);

        Map<Pattern, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();
        map.put(Pattern.compile("two"), bytesPF);
        map.put(Pattern.compile(".+"), pf); // Default PF with StringSerializer
        return new RoutingKafkaTemplate(map);
    }

    @Bean
    public ApplicationRunner runner(RoutingKafkaTemplate routingTemplate) {
        return args -> {
            routingTemplate.send("one", "thing1");
            routingTemplate.send("two", "thing2".getBytes());
        };
    }

}
----

The corresponding `@KafkaListener`+++s+++ for this example are shown in xref:kafka/receiving-messages/listener-annotation.adoc#annotation-properties[Annotation Properties].

For another technique to achieve similar results, but with the additional capability of sending different types to the same topic, see xref:kafka/serdes.adoc#delegating-serialization[Delegating Serializer and Deserializer].

[[producer-factory]]
== Using `DefaultKafkaProducerFactory`

As seen in xref:kafka/sending-messages.adoc#kafka-template[Using `KafkaTemplate`], a `ProducerFactory` is used to create the producer.

When not using xref:kafka/transactions.adoc[Transactions], by default, the `DefaultKafkaProducerFactory` creates a singleton producer used by all clients, as recommended in the `KafkaProducer` JavaDocs.
However, if you call `flush()` on the template, this can cause delays for other threads using the same producer.
Starting with version 2.3, the `DefaultKafkaProducerFactory` has a new property `producerPerThread`.
When set to `true`, the factory will create (and cache) a separate producer for each thread, to avoid this issue.

IMPORTANT: When `producerPerThread` is `true`, user code **must** call `closeThreadBoundProducer()` on the factory when the producer is no longer needed.
This will physically close the producer and remove it from the `ThreadLocal`.
Calling `reset()` or `destroy()` will not clean up these producers.

Also see xref:kafka/transactions.adoc#tx-template-mixed[`KafkaTemplate` Transactional and non-Transactional Publishing].

When creating a `DefaultKafkaProducerFactory`, key and/or value `Serializer` classes can be picked up from configuration by calling the constructor that only takes in a Map of properties (see example in xref:kafka/sending-messages.adoc#kafka-template[Using `KafkaTemplate`]), or `Serializer` instances may be passed to the `DefaultKafkaProducerFactory` constructor (in which case all `Producer` s share the same instances).
Alternatively you can provide `Supplier<Serializer>`+++s+++ (starting with version 2.3) that will be used to obtain separate `Serializer` instances for each `Producer`:

[source, java]
----

@Bean
public ProducerFactory<Integer, CustomValue> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs(), null, () -> new CustomValueSerializer());
}

@Bean
public KafkaTemplate<Integer, CustomValue> kafkaTemplate() {
    return new KafkaTemplate<Integer, CustomValue>(producerFactory());
}

----

Starting with version 2.5.10, you can now update the producer properties after the factory is created.
This might be useful, for example, if you have to update SSL key/trust store locations after a credentials change.
The changes will not affect existing producer instances; call `reset()` to close any existing  producers so that new producers will be created using the new properties.
NOTE: You cannot change a transactional producer factory to non-transactional, and vice-versa.

Two new methods are now provided:

[source, java]
----
void updateConfigs(Map<String, Object> updates);

void removeConfig(String configKey);
----

Starting with version 2.8, if you provide serializers as objects (in the constructor or via the setters), the factory will invoke the `configure()` method to configure them with the configuration properties.

[[replying-template]]
== Using `ReplyingKafkaTemplate`

Version 2.1.3 introduced a subclass of `KafkaTemplate` to provide request/reply semantics.
The class is named `ReplyingKafkaTemplate` and has two additional methods; the following shows the method signatures:

[source, java]
----
RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);

RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record,
    Duration replyTimeout);
----

(Also see xref:kafka/sending-messages.adoc#exchanging-messages[Request/Reply with `Message<?>` s]).

The result is a `CompletableFuture` that is asynchronously populated with the result (or an exception, for a timeout).
The result also has a `sendFuture` property, which is the result of calling `KafkaTemplate.send()`.
You can use this future to determine the result of the send operation.

IMPORTANT: In version 3.0, the futures returned by these methods (and their `sendFuture` properties) have been changed to  `CompletableFuture`+++s+++ instead of `ListenableFuture`+++s+++.

If the first method is used, or the `replyTimeout` argument is `null`, the template's `defaultReplyTimeout` property is used (5 seconds by default).

Starting with version 2.8.8, the template has a new method `waitForAssignment`.
This is useful if the reply container is configured with `auto.offset.reset=latest` to avoid sending a request and a reply sent before the container is initialized.

IMPORTANT: When using manual partition assignment (no group management), the duration for the wait must be greater than the container's `pollTimeout` property because the notification will not be sent until after the first poll is completed.

The following Spring Boot application shows an example of how to use the feature:

[source, java]
----
@SpringBootApplication
public class KRequestingApplication {

    public static void main(String[] args) {
        SpringApplication.run(KRequestingApplication.class, args).close();
    }

    @Bean
    public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
        return args -> {
            if (!template.waitForAssignment(Duration.ofSeconds(10))) {
                throw new IllegalStateException("Reply container did not initialize");
            }
            ProducerRecord<String, String> record = new ProducerRecord<>("kRequests", "foo");
            RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
            SendResult<String, String> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
            System.out.println("Sent ok: " + sendResult.getRecordMetadata());
            ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
            System.out.println("Return value: " + consumerRecord.value());
        };
    }

    @Bean
    public ReplyingKafkaTemplate<String, String, String> replyingTemplate(
            ProducerFactory<String, String> pf,
            ConcurrentMessageListenerContainer<String, String> repliesContainer) {

        return new ReplyingKafkaTemplate<>(pf, repliesContainer);
    }

    @Bean
    public ConcurrentMessageListenerContainer<String, String> repliesContainer(
            ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {

        ConcurrentMessageListenerContainer<String, String> repliesContainer =
                containerFactory.createContainer("kReplies");
        repliesContainer.getContainerProperties().setGroupId("repliesGroup");
        repliesContainer.setAutoStartup(false);
        return repliesContainer;
    }

    @Bean
    public NewTopic kRequests() {
        return TopicBuilder.name("kRequests")
            .partitions(10)
            .replicas(2)
            .build();
    }

    @Bean
    public NewTopic kReplies() {
        return TopicBuilder.name("kReplies")
            .partitions(10)
            .replicas(2)
            .build();
    }

}
----

Note that we can use Boot's auto-configured container factory to create the reply container.

If a non-trivial deserializer is being used for replies, consider using an xref:kafka/serdes.adoc#error-handling-deserializer[`ErrorHandlingDeserializer`] that delegates to your configured deserializer.
When so configured, the `RequestReplyFuture` will be completed exceptionally and you can catch the `ExecutionException`, with the `DeserializationException` in its `cause` property.

Starting with version 2.6.7, in addition to detecting `DeserializationException`+++s+++, the template will call the `replyErrorChecker` function, if provided.
If it returns an exception, the future will be completed exceptionally.

Here is an example:

[source, java]
----
template.setReplyErrorChecker(record -> {
    Header error = record.headers().lastHeader("serverSentAnError");
    if (error != null) {
        return new MyException(new String(error.value()));
    }
    else {
        return null;
    }
});

...

RequestReplyFuture<Integer, String, String> future = template.sendAndReceive(record);
try {
    future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
    ConsumerRecord<Integer, String> consumerRecord = future.get(10, TimeUnit.SECONDS);
    ...
}
catch (InterruptedException e) {
    ...
}
catch (ExecutionException e) {
    if (e.getCause instanceof MyException) {
        ...
    }
}
catch (TimeoutException e) {
    ...
}
----

The template sets a header (named `KafkaHeaders.CORRELATION_ID` by default), which must be echoed back by the server side.

In this case, the following `@KafkaListener` application responds:

[source, java]
----
@SpringBootApplication
public class KReplyingApplication {

    public static void main(String[] args) {
        SpringApplication.run(KReplyingApplication.class, args);
    }

    @KafkaListener(id="server", topics = "kRequests")
    @SendTo // use default replyTo expression
    public String listen(String in) {
        System.out.println("Server received: " + in);
        return in.toUpperCase();
    }

    @Bean
    public NewTopic kRequests() {
        return TopicBuilder.name("kRequests")
            .partitions(10)
            .replicas(2)
            .build();
    }

    @Bean // not required if Jackson is on the classpath
    public MessagingMessageConverter simpleMapperConverter() {
        MessagingMessageConverter messagingMessageConverter = new MessagingMessageConverter();
        messagingMessageConverter.setHeaderMapper(new SimpleKafkaHeaderMapper());
        return messagingMessageConverter;
    }

}
----

The `@KafkaListener` infrastructure echoes the correlation ID and determines the reply topic.

See xref:kafka/receiving-messages/annotation-send-to.adoc[Forwarding Listener Results using `@SendTo`] for more information about sending replies.
The template uses the default header `KafKaHeaders.REPLY_TOPIC` to indicate the topic to which the reply goes.

Starting with version 2.2, the template tries to detect the reply topic or partition from the configured reply container.
If the container is configured to listen to a single topic or a single `TopicPartitionOffset`, it is used to set the reply headers.
If the container is configured otherwise, the user must set up the reply headers.
In this case, an `INFO` log message is written during initialization.
The following example uses `KafkaHeaders.REPLY_TOPIC`:

[source, java]
----
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "kReplies".getBytes()));
----

When you configure with a single reply `TopicPartitionOffset`, you can use the same reply topic for multiple templates, as long as each instance listens on a different partition.
When configuring with a single reply topic, each instance must use a different `group.id`.
In this case, all instances receive each reply, but only the instance that sent the request finds the correlation ID.
This may be useful for auto-scaling, but with the overhead of additional network traffic and the small cost of discarding each unwanted reply.
When you use this setting, we recommend that you set the template's `sharedReplyTopic` to `true`, which reduces the logging level of unexpected replies to DEBUG instead of the default ERROR.

The following is an example of configuring the reply container to use the same shared reply topic:

[source, java]
----
@Bean
public ConcurrentMessageListenerContainer<String, String> replyContainer(
        ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {

    ConcurrentMessageListenerContainer<String, String> container = containerFactory.createContainer("topic2");
    container.getContainerProperties().setGroupId(UUID.randomUUID().toString()); // unique
    Properties props = new Properties();
    props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // so the new group doesn't get old replies
    container.getContainerProperties().setKafkaConsumerProperties(props);
    return container;
}
----

IMPORTANT: If you have multiple client instances and you do not configure them as discussed in the preceding paragraph, each instance needs a dedicated reply topic.
An alternative is to set the `KafkaHeaders.REPLY_PARTITION` and use a dedicated partition for each instance.
The `Header` contains a four-byte int (big-endian).
The server must use this header to route the reply to the correct partition (`@KafkaListener` does this).
In this case, though, the reply container must not use Kafka's group management feature and must be configured to listen on a fixed partition (by using a `TopicPartitionOffset` in its `ContainerProperties` constructor).

NOTE: The `DefaultKafkaHeaderMapper` requires Jackson to be on the classpath (for the `@KafkaListener`).
If it is not available, the message converter has no header mapper, so you must configure a `MessagingMessageConverter` with a `SimpleKafkaHeaderMapper`, as shown earlier.

By default, 3 headers are used:

* `KafkaHeaders.CORRELATION_ID` - used to correlate the reply to a request
* `KafkaHeaders.REPLY_TOPIC` - used to tell the server where to reply
* `KafkaHeaders.REPLY_PARTITION` - (optional) used to tell the server which partition to reply to

These header names are used by the `@KafkaListener` infrastructure to route the reply.

Starting with version 2.3, you can customize the header names - the template has 3 properties `correlationHeaderName`, `replyTopicHeaderName`, and `replyPartitionHeaderName`.
This is useful if your server is not a Spring application (or does not use the `@KafkaListener`).

NOTE: Conversely, if the requesting application is not a spring application and puts correlation information in a different header, starting with version 3.0, you can configure a custom `correlationHeaderName` on the listener container factory and that header will be echoed back.
Previously, the listener had to echo custom correlation headers.

[[exchanging-messages]]
=== Request/Reply with `Message<?>`+++s+++

Version 2.7 added methods to the `ReplyingKafkaTemplate` to send and receive `spring-messaging` 's `Message<?>` abstraction:

[source, java]
----
RequestReplyMessageFuture<K, V> sendAndReceive(Message<?> message);

<P> RequestReplyTypedMessageFuture<K, V, P> sendAndReceive(Message<?> message,
        ParameterizedTypeReference<P> returnType);
----

These will use the template's default `replyTimeout`, there are also overloaded versions that can take a timeout in the method call.

IMPORTANT: In version 3.0, the futures returned by these methods (and their `sendFuture` properties) have been changed to  `CompletableFuture`+++s+++ instead of `ListenableFuture`+++s+++.

Use the first method if the consumer's `Deserializer` or the template's `MessageConverter` can convert the payload without any additional information, either via configuration or type metadata in the reply message.

Use the second method if you need to provide type information for the return type, to assist the message converter.
This also allows the same template to receive different types, even if there is no type metadata in the replies, such as when the server side is not a Spring application.
The following is an example of the latter:

.Template Bean
[tabs]
======
Java::
+
[source, java, role="primary", indent=0]
----
include::{java-examples}/requestreply/Application.java[tag=beans]
----

Kotlin::
+
[source, kotlin, role="secondary",indent=0]
----
include::{kotlin-examples}/requestreply/Application.kt[tag=beans]
----
======

.Using the template
[tabs]
======
Java::
+
[source, java, role="primary", indent=0]
----
include::{java-examples}/requestreply/Application.java[tag=sendReceive]
----

Kotlin::
+
[source, kotlin, role="secondary", indent=0]
----
include::{kotlin-examples}/requestreply/Application.kt[tag=sendReceive]
----
======

[[reply-message]]
== Reply Type Message<?>

When the `@KafkaListener` returns a `Message<?>`, with versions before 2.5, it was necessary to populate the reply topic and correlation id headers.
In this example, we use the reply topic header from the request:

[source, java]
----
@KafkaListener(id = "requestor", topics = "request")
@SendTo
public Message<?> messageReturn(String in) {
    return MessageBuilder.withPayload(in.toUpperCase())
            .setHeader(KafkaHeaders.TOPIC, replyTo)
            .setHeader(KafkaHeaders.KEY, 42)
            .setHeader(KafkaHeaders.CORRELATION_ID, correlation)
            .build();
}
----

This also shows how to set a key on the reply record.

Starting with version 2.5, the framework will detect if these headers are missing and populate them with the topic - either the topic determined from the `@SendTo` value or the incoming `KafkaHeaders.REPLY_TOPIC` header (if present).
It will also echo the incoming `KafkaHeaders.CORRELATION_ID` and `KafkaHeaders.REPLY_PARTITION`, if present.

[source, java]
----
@KafkaListener(id = "requestor", topics = "request")
@SendTo  // default REPLY_TOPIC header
public Message<?> messageReturn(String in) {
    return MessageBuilder.withPayload(in.toUpperCase())
            .setHeader(KafkaHeaders.KEY, 42)
            .build();
}
----

[[aggregating-request-reply]]
== Aggregating Multiple Replies

The template in xref:kafka/sending-messages.adoc#replying-template[Using `ReplyingKafkaTemplate`] is strictly for a single request/reply scenario.
For cases where multiple receivers of a single message return a reply, you can use the `AggregatingReplyingKafkaTemplate`.
This is an implementation of the client-side of the https://www.enterpriseintegrationpatterns.com/patterns/messaging/BroadcastAggregate.html[Scatter-Gather Enterprise Integration Pattern].

Like the `ReplyingKafkaTemplate`, the `AggregatingReplyingKafkaTemplate` constructor takes a producer factory and a listener container to receive the replies; it has a third parameter `BiPredicate<List<ConsumerRecord<K, R>>, Boolean> releaseStrategy` which is consulted each time a reply is received; when the predicate returns `true`, the collection of `ConsumerRecord`+++s+++ is used to complete the `Future` returned by the `sendAndReceive` method.

There is an additional property `returnPartialOnTimeout` (default false).
When this is set to `true`, instead of completing the future with a `KafkaReplyTimeoutException`, a partial result completes the future normally (as long as at least one reply record has been received).

Starting with version 2.3.5, the predicate is also called after a timeout (if `returnPartialOnTimeout` is `true`).
The first argument is the current list of records; the second is `true` if this call is due to a timeout.
The predicate can modify the list of records.

[source, java]
----
AggregatingReplyingKafkaTemplate<Integer, String, String> template =
        new AggregatingReplyingKafkaTemplate<>(producerFactory, container,
                        coll -> coll.size() == releaseSize);
...
RequestReplyFuture<Integer, String, Collection<ConsumerRecord<Integer, String>>> future =
        template.sendAndReceive(record);
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
ConsumerRecord<Integer, Collection<ConsumerRecord<Integer, String>>> consumerRecord =
        future.get(30, TimeUnit.SECONDS);
----

Notice that the return type is a `ConsumerRecord` with a value that is a collection of `ConsumerRecord`+++s+++.
The "outer" `ConsumerRecord` is not a "real" record, it is synthesized by the template, as a holder for the actual reply records received for the request.
When a normal release occurs (release strategy returns true), the topic is set to `aggregatedResults`; if `returnPartialOnTimeout` is true, and timeout occurs (and at least one reply record has been received), the topic is set to `partialResultsAfterTimeout`.
The template provides constant static variables for these "topic" names:

[source, java]
----
/**
 * Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
 * results in its value after a normal release by the release strategy.
 */
public static final String AGGREGATED_RESULTS_TOPIC = "aggregatedResults";

/**
 * Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
 * results in its value after a timeout.
 */
public static final String PARTIAL_RESULTS_AFTER_TIMEOUT_TOPIC = "partialResultsAfterTimeout";
----

The real `ConsumerRecord`+++s+++ in the `Collection` contain the actual topic(s) from which the replies are received.

IMPORTANT: The listener container for the replies **must** be configured with `AckMode.MANUAL` or `AckMode.MANUAL_IMMEDIATE`; the consumer property `enable.auto.commit` must be `false` (the default since version 2.3).
To avoid any possibility of losing messages, the template only commits offsets when there are zero requests outstanding, i.e. when the last outstanding request is released by the release strategy.
After a rebalance, it is possible for duplicate reply deliveries; these will be ignored for any in-flight requests; you may see error log messages when duplicate replies are received for already released replies.

NOTE: If you use an xref:kafka/serdes.adoc#error-handling-deserializer[`ErrorHandlingDeserializer`] with this aggregating template, the framework will not automatically detect `DeserializationException`+++s+++.
Instead, the record (with a `null` value) will be returned intact, with the deserialization exception(s) in headers.
It is recommended that applications call the utility method `ReplyingKafkaTemplate.checkDeserialization()` method to determine if a deserialization exception occurred.
See its JavaDocs for more information.
The `replyErrorChecker` is also not called for this aggregating template; you should perform the checks on each element of the reply.

